package com.example.demo.consumer;

import com.alibaba.fastjson.JSON;
import com.example.demo.common.QueueName;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 消息接收
 *
 * @author xushun
 * @email 505232797@qq.com
 * @date 2019/1/15 14:39
 */
@Component
@Slf4j
public class MsgReceiver {
    private int threadTotal = 100;
    private CountDownLatch countDownLatch = new CountDownLatch(threadTotal);
    private AtomicBoolean isStartThead = new AtomicBoolean(false);
    private AtomicInteger count = new AtomicInteger();

    /**
     * 自动创建的队列
     *
     * @param msg
     */
//    @RabbitListener(queuesToDeclare = @Queue(QueueName.MSG))
//    public void msg1(String msg) {
//        countDownLatch.countDown();
//        if (isStartThead.compareAndSet(false, true)) {
//            Thread thread = new Thread(() -> {
//                try {
//                    countDownLatch.await();
//                    log.info("msg print: {}", msg);
//                } catch (InterruptedException e) {
//                    log.error("countDownLatch.await error", e);
//                }
//            });
//            thread.start();
//        }
//    }
//
//    @RabbitListener(queuesToDeclare = @Queue(QueueName.MSG) )
//    public void msg2(String msg) {
//        count.addAndGet(1);
//        if (count.get() % 100 == 0) {
//            log.info("msg count: {}", count.get());
//            log.info("msg print: {}", msg);
//        }
//    }
    @RabbitListener(queuesToDeclare = @Queue(QueueName.MSG))
    public void msg(String msg, @Headers Map<String, Object> headers, Channel channel) throws IOException {
        log.info("msg print: {}", msg);
        log.info("headers：{}", JSON.toJSONString(headers));
        channel.basicAck(Long.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG).toString()), false);
    }
}
